{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 12.4 分布式TensorFlow\n",
    "通过多GPU并行的方式可以达到很好的加速效果。然而一台机器上能够安装的GPU有限，要进一步提升深度学习模型的训练速度，就需要**将TensorFlow分布式运行在多台机器上**。本节将介绍如何编写以及运行分布式TensorFlow的程序。\n",
    "\n",
    "首先，在12.4.1节中将介绍分布式TensorFlow的工作原理，并给出最简单的分布式TensorFlow样例程序，在这一节中也将介绍不同的TensorFlow分布式方式。然后，在12.4.2节中将给出两个完整的TensorFlow样例程序来同步或者异步地训练深度学习模型。\n",
    "\n",
    "### 12.4.1 分布式TensorFlow原理\n",
    "在12.2节中介绍了分布式TensorFlow训练深度学习模型的理论。本节将具体介绍如何使用TensorFlow在分布式集群中训练深度学习模型。以下代码展示了如何创建一个最简单的TensorFlow集群："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "b'Hello, distributed TensorFlow!'\n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "c = tf.constant(\"Hello, distributed TensorFlow!\")\n",
    "\n",
    "# 创建一个本地集群。\n",
    "server = tf.train.Server.create_local_server()\n",
    "sess = tf.Session(server.target)\n",
    "print(sess.run(c))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在以上代码中，首先通过`tf.train.Server.create_local_server`函数在本地建立了一个只有一台机器的TensorFlow集群。然后在该集群上生成了一个会话，并通过生成的会话将运算运行在创建的TensorFlow集群上。虽然这只是一个单机集群，但它大致反映了TensorFlow集群的工作流程。**TensorFlow集群通过一系列的任务（tasks）来执行TensorFlow计算图中的运算。**一般来说，不同任务跑在不同机器上。最主要的例外是使用GPU时，不同任务可以使用同一台机器上的不同GPU。**TensorFlow集群中的任务也会被聚合成工作（jobs），每个工作可以包含一个或者多个任务。**比如在训练、深度学习模型时，一台运行反向传播的机器是一个任务，而所有运行反向传播机器的集合是一种工作。\n",
    "\n",
    "以上样例代码是只有一个任务的集群。当一个TensorFlow集群有多个任务时，需要使用`tf.train.ClusterSpec`来指定运行每一个任务的机器。比如以下代码展示了在本地运行有两个任务的TensorFlow集群。\n",
    "\n",
    "第一个任务的代码如下："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "c = tf.constant(\"Hello from server1!\")\n",
    "\n",
    "# 生成一个有两个任务的集群，一个任务跑在本地2222端口，另外一个跑在本地2223端口。\n",
    "cluster = tf.train.ClusterSpec({\"local\": [\"localhost:2222\", \"localhost:2223\"]})\n",
    "# 通过上面生成的集群配置生成Server，并通过job_name和task_index指定当前所启动\n",
    "# 的任务。因为该任务是第一个任务，所以task_index为0。\n",
    "server = tf.train.Server(cluster, job_name=\"local\", task_index=0)\n",
    "\n",
    "sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True)) \n",
    "print(sess.run(c))\n",
    "server.join()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "第二个任务的代码如下："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "\n",
    "c = tf.constant(\"Hello from server2!\")\n",
    "\n",
    "# 和第一个程序一样的集群配置。集群中的每一个任务需要采用相同的配置。\n",
    "cluster = tf.train.ClusterSpec({\"local\": [\"localhost:2222\", \"localhost:2223\"]})\n",
    "# 指定task_index为1，所以这个程序将在localhost:2223 启动服务。\n",
    "server = tf.train.Server(cluster, job_name=\"local\", task_index=1)\n",
    "\n",
    "sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True)) \n",
    "print(sess.run(c))\n",
    "server.join()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "启动第一个任务后，可以得到类似下面的输出："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize\n",
    "GrpcChannelCache for job local -> {0 -> localhost:2222 , 1 -> localhost:2223}\n",
    "tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316) Started\n",
    "server with target: grpc://localhost:2222\n",
    "tensorflow/core/distributed_runtime/master.cc:209] CreateSession still\n",
    "waiting for response from worker: /job:local/replica:0/task:1\n",
    "tensorflow/core/distributed runtime/master.cc:209] CreateSession still\n",
    "waiting for response from worker: /job:local/replica:0/task:1\n",
    "tensorflow/core/distributed runtime/master session.cc:998) Start master\n",
    "session 67a422339d0b7833 with config: log_device_placement: true\n",
    "Const : (Const): /job:local/replica:0/task:0/cpu:0\n",
    "tensorflow/core/common_runtime/simple_placer.cc:872] Const :\n",
    "(Const)/job:local/replica:0/task:0/cpu:0\n",
    "Hello from server1!\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "从第一个任务的输出中可以看到，当只启动第一个任务时，程序会停下来等待第二个任务启动，而且将持续输出CreateSession still waiting for response from worker:/job:local/replica:0/task:1。当第二个任务启动后，可以看到从第一个任务中会输出Hello from server1！的结果。第二个任务的输出如下："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "tensorflow/core/distributed_runtime/rpc/grp_channel.cc: 215] Initialize\n",
    "GrpcChannelCache for job local -> {0 - > localhost:2222 , 1 -> localhost:2223}\n",
    "tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:316] Started\n",
    "server with target: grpc://localhost:2223\n",
    "tensorflow/core/distributed_runtime/master_session.cc:998] Start master\n",
    "session 79bb86bdlf7ecf4f with config: log device placement: true\n",
    "Const : (Const): /job:local/replica:0/task:0/cpu:0\n",
    "tensorflow/core/common_runtime/simple_placer.cc:872] Const:\n",
    "(Const)/job:local/replica:0/task:0/cpu:0\n",
    "Hello from server2!\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "值得注意的是第二个任务中定义的计算也被放在了设备/job:local/replica:0/task:0/cpu:0上。也就是说这个计算将由第一个任务来执行。从上面这个样例可以看到，通过tf.train.Server.target生成的会话可以统一管理整个TensorFlow集群中的资源。\n",
    "\n",
    "**本人自己的实验：**由于这里设置了log_device_placement=True，同样需要在命令行运行上述代码(本文件目录下distribution_a.py和distribution_a.py)，最终得到如下图输出：\n",
    "<p align='center'>\n",
    "    <img src=images/图12.12.JPG>\n",
    "    <center>图12-12 只启动第一个任务后的输出</center>\n",
    "    <img src=images/图12.13.JPG>\n",
    "    <center>图12-13 启动第二个任务后的二者输出</center>\n",
    "</p>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**指定操作运算的任务：**和使用多GPU类似，TensorFlow支持通过`tf.device`来指定操作运行在哪个任务上。比如将第二个任务中定义计算的语句改为以下代码，就可以看到这个计算将被调度到/job:local/replica:0/task:1/cpu:0上面（原本是：/job:local/replica:0/task:0/cpu:0）。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "with tf.device(\"/job:local/task:1\"):\n",
    "    c = tf.constant(\"Hello from server2!\")\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**多个工作：**在以上样例中只定义了一个工作“local” 。但一般在训练深度学习模型时，会定义两个工作:\n",
    "- 一个工作专门负责存储、获取以及更新变量的取值，这个工作所包含的任务统称为**参数服务器**（parameter server, ps）;\n",
    "- 另外一个工作负责运行反向传播算法来获取参数梯度，这个工作所包含的任务统称为**计算服务器**（worker）。\n",
    "\n",
    "下面给出了一个比较常见的用于训练深度学习模型的TensorFlow集群配置方法（注意这里给出的worker(i)和tf-ps(i)都是服务器地址）。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "tf.train.ClusterSpec({\n",
    "    \"worker\": [\n",
    "        \"tf-worker0:2222\",\n",
    "        \"tf-worker1:2222\",\n",
    "        \"tf-worker2:2222\"\n",
    "    ],\n",
    "    \"ps\": [\n",
    "        \"tf-ps0:2222\",\n",
    "        \"tf-ps0:2222\"\n",
    "    ]})\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "使用分布式TensorFlow训练深度学习模型一般有两种方式:\n",
    "- 一种方式叫做**计算图内分布式（in-graph replication)。**使用这种分布式训练方式时，**所有的任务都会使用一个TensorFlow计算图中的变量（也就是深度学习模型中的参数），而只是将计算部分发布到不同的计算服务器上。**12.3节中给出的使用多GPU样例程序就是这种方式。多GPU样例程序将计算复制了多份，每一份放到一个GPU上进行运算。但不同的GPU使用的参数都是在一个TensorFlow计算图中的。因为参数都是存在同一个计算图中，所以同步更新参数比较容易控制。在12.3节中给出的代码也实现了参数的同步更新。然而因为计算图内分布式需要有一个中心节点来生成这个计算图井分配计算任务，所以当数据量太大时，这个中心节点容易造成性能瓶颈。\n",
    "- 另一种方式叫**计算图之间分布式( between-graph replication ）。**使用这种分布式方式时，**在每一个计算服务器上都会创建一个独立的TensorFlow计算图，但不同计算图中的相同参数需要以一种固定的方式放到同一个参数服务器上**。TensorFlow提供了`tf.train.replica_device_setter`函数来帮助完成这一个过程，在12.4.2节中将给出具体的样例。因为每个计算服务器的TensorFlow计算图是独立的，所以这种方式的并行度要更高。但在计算图之间分布式下进行参数的同步更新比较困难。为了解决这个问题，TensorFlow提供了`tf.train.SyncReplicasOptimizer`函数来帮助实现参数的同步更新。这让计算图之间分布式方式被**更加广泛地使用**。在12.4.2节中将给出使用计算图之间分布式的样例程序来实现异步模式和同步模式的并行化深度学习模型训练过程。\n",
    "\n",
    "### 12.4.2 分布式TensorFlow模型训练\n",
    "本节中将给出两个样例程序分别实现使用计算图之间分布式（Between-graph replication）完成分布式深度学习模型训练的异步更新和同步更新。\n",
    "\n",
    "- 第一部分将给出使用计算图之间分布式实现**异步更新**的TensorFlow程序。这一部分也会给出具体的命令行将该程序分布式的运行在一个参数服务器和两个计算服务器上，并通过TensorBoard可视化在第一个计算服务器上的TensorFlow计算图。\n",
    "- 第二部分将给出计算图之间分布式实现**同步参数更新**的TensorFlow程序。同步参数更新的代码大部分和异步更新相似，所以在这一部分中将重点介绍它们之间的不同之处。\n",
    "\n",
    "**1. 异步模式样例程序**\n",
    "\n",
    "以下样例代码将仍然采用5.5节中给出的模式，并复用5.5节mnist_inference.py程序中定义的前向传播算法。以下代码实现了异步模式的分布式神经网络训练过程(当然，12.4.2节的程序限于条件，没有实际运行)："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import tensorflow as tf\n",
    "from tensorflow.examples.tutorials.mnist import input_data\n",
    "\n",
    "import mnist_inference\n",
    "\n",
    "# 配置神经网络的参数。\n",
    "BATCH_SIZE = 100\n",
    "LEARNING_RATE_BASE = 0.01\n",
    "LEARNING_RATE_DECAY = 0.99\n",
    "REGULARAZTION_RATE = 0.0001\n",
    "TRAINING_STEPS = 20000\n",
    "MOVING_AVERAGE_DECAY = 0.99\n",
    "\n",
    "# 模型保存的路径。\n",
    "MODEL_SAVE_PATH = \"logs/log_async\"\n",
    "# MNIST数据路径。\n",
    "DATA_PATH = \"../../datasets/MNIST_data\"\n",
    "\n",
    "# 通过flags指定运行的参数。在12.4.1小节中对于不同的任务（task）给出了不同的程序，\n",
    "# 但这不是一种可扩展的方式。在这一小节中将使用运行程序时给出的参数来配置在不同\n",
    "# 任务中运行的程序。\n",
    "FLAGS = tf.app.flags.FLAGS\n",
    "\n",
    "# 指定当前运行的是参数服务器还是计算服务器。参数服务器只负责TensorFlow中变量的维护\n",
    "# 和管理，计算服务器负责每一轮迭代时运行反向传播过程。\n",
    "tf.app.flags.DEFINE_string('job_name', 'worker', ' \"ps\" or \"worker\" ')\n",
    "# 指定集群中的参数服务器地址。\n",
    "tf.app.flags.DEFINE_string(\n",
    "    'ps_hosts', ' tf-ps0:2222,tf-ps1:1111',\n",
    "    'Comma-separated list of hostname:port for the parameter server jobs. e.g. \"tf-ps0:2222,tf-ps1:1111\" ')\n",
    "# 指定集群中的计算服务器地址。\n",
    "tf.app.flags.DEFINE_string(\n",
    "    'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',\n",
    "    'Comma-separated list of hostname:port for the worker jobs. e.g. \"tf-worker0:2222,tf-worker1:1111\" ')\n",
    "# 指定当前程序的任务ID。TensorFlow会自动根据参数服务器/计算服务器列表中的端口号\n",
    "# 来启动服务。注意参数服务器和计算服务器的编号都是从0开始的。\n",
    "tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')\n",
    "\n",
    "# 定义TensorFlow的计算图，并返回每一轮迭代时需要运行的操作。这个过程和5.5节中的主\n",
    "# 函数基本一致，但为了使处理分布式计算的部分更加突出，本小节将此过程整理为一个函数。\n",
    "def build_model(x, y_, is_chief):\n",
    "    regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)\n",
    "    # 通过和5.5节给出的mnist_inference.py代码计算神经网络前向传播的结果。\n",
    "    y = mnist_inference.inference(x, regularizer)\n",
    "    global_step = tf.contrib.framework.get_or_create_global_step()\n",
    "\n",
    "    # 计算损失函数并定义反向传播过程。\n",
    "    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))\n",
    "    cross_entropy_mean = tf.reduce_mean(cross_entropy)\n",
    "    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))\n",
    "    learning_rate = tf.train.exponential_decay(\n",
    "        LEARNING_RATE_BASE,\n",
    "        global_step,\n",
    "        60000 / BATCH_SIZE,\n",
    "        LEARNING_RATE_DECAY)\n",
    "    \n",
    "    train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(\n",
    "        loss, global_step=global_step)\n",
    "\n",
    "    # 定义每一轮迭代需要运行的操作。\n",
    "    if is_chief:\n",
    "        # 计算变量的滑动平均值。   \n",
    "        variable_averages = tf.train.ExponentialMovingAverage(\n",
    "            MOVING_AVERAGE_DECAY, global_step)\n",
    "        variables_averages_op = variable_averages.apply(\n",
    "            tf.trainable_variables())\n",
    "        with tf.control_dependencies([variables_averages_op, train_op]):\n",
    "            train_op = tf.no_op()\n",
    "    return global_step, loss, train_op\n",
    "\n",
    "def main(argv=None):\n",
    "    # 解析flags并通过tf.train.ClusterSpec配置TensorFlow集群。\n",
    "    ps_hosts = FLAGS.ps_hosts.split(',')\n",
    "    worker_hosts = FLAGS.worker_hosts.split(',')\n",
    "    cluster = tf.train.ClusterSpec({\"ps\": ps_hosts, \"worker\": worker_hosts})\n",
    "    # 通过tf.train.ClusterSpec以及当前任务创建tf.train.Server。\n",
    "    server = tf.train.Server(cluster,\n",
    "                             job_name=FLAGS.job_name,\n",
    "                             task_index=FLAGS.task_id)\n",
    "\n",
    "    # 参数服务器只需要管理TensorFlow中的变量，不需要执行训练的过程。server.join()会\n",
    "    # 一致停在这条语句上。\n",
    "    if FLAGS.job_name == 'ps':\n",
    "        with tf.device(\"/cpu:0\"):\n",
    "            server.join()\n",
    "\n",
    "    # 定义计算服务器需要运行的操作。\n",
    "    is_chief = (FLAGS.task_id == 0)\n",
    "    mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)\n",
    "\n",
    "    # 通过tf.train.replica_device_setter函数来指定执行每一个运算的设备。\n",
    "    # tf.train.replica_device_setter函数会自动将所有的参数分配到参数服务器上，而\n",
    "    # 计算分配到当前的计算服务器上。图12-9展示了通过TensorBoard可视化得到的第一个计\n",
    "    # 算服务器上运算分配的结果。\n",
    "    device_setter = tf.train.replica_device_setter(\n",
    "        worker_device=\"/job:worker/task:%d\" % FLAGS.task_id,\n",
    "        cluster=cluster)\n",
    "    \n",
    "    with tf.device(device_setter):\n",
    "        # 定义输入并得到每一轮迭代需要运行的操作。\n",
    "        x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')\n",
    "        y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input')\n",
    "        global_step, loss, train_op = build_model(x, y_, is_chief)\n",
    "\n",
    "        hooks=[tf.train.StopAtStepHook(last_step=TRAINING_STEPS)]\n",
    "        sess_config = tf.ConfigProto(allow_soft_placement=True,\n",
    "                                     log_device_placement=False)\n",
    "\n",
    "        # 通过tf.train.MonitoredTrainingSession管理训练深度学习模型的通用功能。\n",
    "        with tf.train.MonitoredTrainingSession(master=server.target,\n",
    "                                               is_chief=is_chief,\n",
    "                                               checkpoint_dir=MODEL_SAVE_PATH,\n",
    "                                               hooks=hooks,\n",
    "                                               save_checkpoint_secs=60,\n",
    "                                               config=sess_config) as mon_sess:\n",
    "            print(\"session started.\")\n",
    "            step = 0\n",
    "            start_time = time.time()\n",
    "\n",
    "            # 执行迭代过程。在迭代过程中tf.train.MonitoredTrainingSession会帮助完成初始\n",
    "            # 化、从checkpoint中加载训练过的模型、输出日志并保存模型， 所以下面的程序中不需要\n",
    "            # 在调用这些过程。tf.train.StopAtStepHook会帮忙判断是否需要退出。\n",
    "            while not mon_sess.should_stop():                \n",
    "                xs, ys = mnist.train.next_batch(BATCH_SIZE)\n",
    "                _, loss_value, global_step_value = mon_sess.run(\n",
    "                    [train_op, loss, global_step], feed_dict={x: xs, y_: ys})\n",
    "\n",
    "                # 每隔一段时间输出训练信息。不同的计算服务器都会更新全局的训练轮数，所以这里使用\n",
    "                # global_step_value得到在训练中使用过的batch的总数。\n",
    "                if step > 0 and step % 100 == 0:\n",
    "                    duration = time.time() - start_time\n",
    "                    sec_per_batch = duration / global_step_value\n",
    "                    format_str = \"After %d training steps (%d global steps), \" +\\\n",
    "                                 \"loss on training batch is %g. (%.3f sec/batch)\"\n",
    "                    print(format_str % (step, global_step_value, loss_value, sec_per_batch))\n",
    "                step += 1\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    tf.app.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "假设上面代码的文件名为mnist_distribution_async.py，那么要启动一个拥有一个参数服务器、两个计算服务器的集群，需要先在运行参数服务器的机器上启动以下命令（这里tf-ps0、tf-worker0和tf-worker1都是服务器地址，如果在本地运行，可以使用localhost:2222、\n",
    "localhost:2223、localhost:2224代替）："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "python mnist_distribution_async.py \\\n",
    "--job_name='ps' \\\n",
    "--task_id=0 \\\n",
    "--ps_hosts='tf-ps0:2222' \\\n",
    "--worker_hosts='tf-worker0:2222,tf-worker1:2222'\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "然后在运行第一个计算服务器的机器上启动以下命令："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "python mnist_distribution_async.py \\\n",
    "--job_name='worker' \\\n",
    "--task_id=0 \\\n",
    "--ps_hosts='tf-ps0:2222' \\\n",
    "--worker_hosts='tf-worker0:2222,tf-worker1:2222'\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "最后在运行第二个计算服务器的机器上启动以下命令："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "python mnist_distribution_async.py \\\n",
    "--job_name='worker' \\\n",
    "--task_id=1 \\\n",
    "--ps_hosts='tf-ps0:2222' \\\n",
    "--worker_hosts='tf-worker0:2222,tf-worker1:2222'\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在启动第一个计算服务器之后，这个计算服务器就会尝试连接其他的服务器（包括计算服务器和参数服务器〉。如果其他服务器还没有启动，则被启动的计算服务器会提示等待连接其他服务器。以下代码展示了一个预警信息。\n",
    "\n",
    "`tensorflow/core/distributed_runtime/master.cc:209] CreateSession still waiting for response from worker: /job:worker/replica:0/task:1`\n",
    "\n",
    "不过这不会影响TensorFlow集群的启动。当TensorFlow集群中所有服务器都被启动之后，每一个计算服务器将不再预警。在TensorFlow集群完全启动之后，训练过程将被执行。图12.14展示了第一个计算服务器的TensorFlow计算图，可以看出，神经网络中定义的参数被放在了参数服务器上（图中浅灰色节点），而反向传播的计算过程则放在了当前的计算服务器上（图中的深灰色节点） 。\n",
    "<p align='center'>\n",
    "    <img src=images/图12.14.JPG>\n",
    "    <center>图12.14 通过TensorBoard可视化分布式TensorFlow计算图</center>\n",
    "</p>\n",
    "\n",
    "在计算服务器训练神经网络的过程中，第一个计算服务器会输出类似下面的信息:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "After 100 training steps (100 global steps) , loss on training batch is\n",
    "0.302718. (0.039 sec/batch)\n",
    "After 200 training steps (200 global steps) , loss on training batch is\n",
    "0.269476. (0.037 sec/batch)\n",
    "After 300 training steps (300 global steps) , loss on training batch is\n",
    "0.286755. (0.037 sec/batch)\n",
    "After 400 training steps (463 global steps) , loss on training batch is\n",
    "0.349983. (0.033 sec/batch)\n",
    "After 500 training steps (666 global steps) , loss on training batch is\n",
    "0.229955. (0.029 sec/batch )\n",
    "After 600 training steps (873 global steps) , loss on training batch is\n",
    "0.245588. (0.027 sec/batch)\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "第一个计算服务器会输出类似下面的信息:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "After 100 training steps (537 global steps) , loss on training batch is\n",
    "0.223165. (0.007 sec/batch)\n",
    "After 200 training steps (732 global steps) , loss on training batch is\n",
    "0.186126. (0.010 sec/batch)\n",
    "After 300 training steps (925 global steps) , loss on training batch is\n",
    "0.228191. (0.012 sec/batch)\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "从输出的信息中可以看到，在第二个计算服务器启动之前，第一个计算服务器己经运行了很多轮迭代了。在异步模式下，即使有计算服务器没有正常工作，参数更新的过程仍可继续，而且全局的迭代轮数是所有计算服务器迭代轮数的和。\n",
    "\n",
    "**2. 同步模型训练样例**\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import tensorflow as tf\n",
    "from tensorflow.examples.tutorials.mnist import input_data\n",
    "\n",
    "import mnist_inference\n",
    "\n",
    "# 配置神经网络的参数。\n",
    "BATCH_SIZE = 100\n",
    "LEARNING_RATE_BASE = 0.01\n",
    "LEARNING_RATE_DECAY = 0.99\n",
    "REGULARAZTION_RATE = 0.0001\n",
    "TRAINING_STEPS = 20000\n",
    "MOVING_AVERAGE_DECAY = 0.99\n",
    "\n",
    "MODEL_SAVE_PATH = \"logs/log_sync\"\n",
    "DATA_PATH = \"../../datasets/MNIST_data\"\n",
    "\n",
    "# 和异步模式类似的设置flags。\n",
    "FLAGS = tf.app.flags.FLAGS\n",
    "\n",
    "tf.app.flags.DEFINE_string('job_name', 'worker', ' \"ps\" or \"worker\" ')\n",
    "tf.app.flags.DEFINE_string(\n",
    "    'ps_hosts', ' tf-ps0:2222,tf-ps1:1111',\n",
    "    'Comma-separated list of hostname:port for the parameter server jobs. e.g. \"tf-ps0:2222,tf-ps1:1111\" ')\n",
    "tf.app.flags.DEFINE_string(\n",
    "    'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',\n",
    "    'Comma-separated list of hostname:port for the worker jobs. e.g. \"tf-worker0:2222,tf-worker1:1111\" ')\n",
    "tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')\n",
    "\n",
    "# 和异步模式类似的定义TensorFlow的计算图。唯一的区别在于使用\n",
    "# tf.train.SyncReplicasOptimizer函数处理同步更新。\n",
    "def build_model(x, y_, n_workers, is_chief):\n",
    "    regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)\n",
    "    y = mnist_inference.inference(x, regularizer)\n",
    "    global_step = tf.contrib.framework.get_or_create_global_step()\n",
    "\n",
    "    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))\n",
    "    cross_entropy_mean = tf.reduce_mean(cross_entropy)\n",
    "    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))\n",
    "    learning_rate = tf.train.exponential_decay(\n",
    "        LEARNING_RATE_BASE,\n",
    "        global_step,\n",
    "        60000 / BATCH_SIZE,\n",
    "        LEARNING_RATE_DECAY)\n",
    "    \n",
    "    # 通过tf.train.SyncReplicasOptimizer函数实现同步更新。\n",
    "    opt = tf.train.SyncReplicasOptimizer(\n",
    "        tf.train.GradientDescentOptimizer(learning_rate),\n",
    "        replicas_to_aggregate=n_workers,\n",
    "        total_num_replicas=n_workers)\n",
    "    sync_replicas_hook = opt.make_session_run_hook(is_chief)\n",
    "    train_op = opt.minimize(loss, global_step=global_step)\n",
    "    \n",
    "    if is_chief:\n",
    "        variable_averages = tf.train.ExponentialMovingAverage(\n",
    "            MOVING_AVERAGE_DECAY, global_step)\n",
    "        variables_averages_op = variable_averages.apply(\n",
    "            tf.trainable_variables())\n",
    "        with tf.control_dependencies([variables_averages_op, train_op]):\n",
    "            train_op = tf.no_op()\n",
    "            \n",
    "    return global_step, loss, train_op, sync_replicas_hook\n",
    "\n",
    "def main(argv=None):\n",
    "    # 和异步模式类似的创建TensorFlow集群。\n",
    "    ps_hosts = FLAGS.ps_hosts.split(',')\n",
    "    worker_hosts = FLAGS.worker_hosts.split(',')\n",
    "    n_workers = len(worker_hosts)\n",
    "    cluster = tf.train.ClusterSpec({\"ps\": ps_hosts, \"worker\": worker_hosts})\n",
    "\n",
    "    server = tf.train.Server(cluster,\n",
    "                             job_name=FLAGS.job_name,\n",
    "                             task_index=FLAGS.task_id)\n",
    "\n",
    "    if FLAGS.job_name == 'ps':\n",
    "        with tf.device(\"/cpu:0\"):\n",
    "            server.join()\n",
    "\n",
    "    is_chief = (FLAGS.task_id == 0)\n",
    "    mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)\n",
    "\n",
    "    device_setter = tf.train.replica_device_setter(\n",
    "        worker_device=\"/job:worker/task:%d\" % FLAGS.task_id,\n",
    "        cluster=cluster)\n",
    "    \n",
    "    with tf.device(device_setter):\n",
    "        x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')\n",
    "        y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input')\n",
    "        global_step, loss, train_op, sync_replicas_hook = build_model(x, y_, n_workers, is_chief)\n",
    "\n",
    "        # 把处理同步更新的hook也加进来。\n",
    "        hooks=[sync_replicas_hook, tf.train.StopAtStepHook(last_step=TRAINING_STEPS)]\n",
    "        sess_config = tf.ConfigProto(allow_soft_placement=True,\n",
    "                                     log_device_placement=False)\n",
    "\n",
    "        # 训练过程和异步一致。\n",
    "        with tf.train.MonitoredTrainingSession(master=server.target,\n",
    "                                               is_chief=is_chief,\n",
    "                                               checkpoint_dir=MODEL_SAVE_PATH,\n",
    "                                               hooks=hooks,\n",
    "                                               save_checkpoint_secs=60,\n",
    "                                               config=sess_config) as mon_sess:\n",
    "            print(\"session started.\")\n",
    "            step = 0\n",
    "            start_time = time.time()\n",
    "\n",
    "            while not mon_sess.should_stop():                \n",
    "                xs, ys = mnist.train.next_batch(BATCH_SIZE)\n",
    "                _, loss_value, global_step_value = mon_sess.run(\n",
    "                    [train_op, loss, global_step], feed_dict={x: xs, y_: ys})\n",
    "\n",
    "                if step > 0 and step % 100 == 0:\n",
    "                    duration = time.time() - start_time\n",
    "                    sec_per_batch = duration / global_step_value\n",
    "                    format_str = \"After %d training steps (%d global steps), \" +\\\n",
    "                                 \"loss on training batch is %g. (%.3f sec/batch)\"\n",
    "                    print(format_str % (step, global_step_value, loss_value, sec_per_batch))\n",
    "                step += 1\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    tf.app.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "和异步模式类似，在不同机器上运行以上代码就可以启动TensorFlow集群。在第一个计算服务器上，可以看到与下面类似的输出:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "After 100 training steps (49 global steps ), loss on training batch is 1.60499.\n",
    "(0.049 sec/batch)\n",
    "After 200 training steps (99 global steps ), loss on training batch is 1.10667.\n",
    "(0 . 040 sec/batch)\n",
    "After 300 training steps (149 global steps ), loss on training batch is 0.968059.\n",
    "(0 . 036 sec/batch )\n",
    "After 400 training steps (230 global steps ), loss on training batch is 0.833886.\n",
    "(0.035 sec/batch)\n",
    "After 500 training steps (330 global steps ), loss on training batch is 0.846468.\n",
    "(0.032 sec/batch)\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "在第二个计算服务器上的输出如下:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "'''\n",
    "After 100 training steps (268 global steps ), loss on training batch is 0.810314.\n",
    "(0.035 sec/batch)\n",
    "After 200 training steps (368 global steps ), loss on training batch is 0.602304.\n",
    "(0.032 sec/batch)\n",
    "After 300 training steps (468 global steps ), loss on training batch is 0.72167.\n",
    "(0.031 sec/batch)\n",
    "After 400 training steps (568 global steps ), loss on training batch is 0.529358.\n",
    "(0.030 sec/batch)\n",
    "After 500 training steps (668 global steps ), loss on training batch is 0.626258.\n",
    "(0.030 sec/batch)\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "和异步模式不同，在同步模式下，global_step差不多是两个计算服务器local_step的平均值。比如在第二个计算服务器还没有开始之前，global_step是第一个服务器local_step的一半。这是因为同步模式要求收集replicas_to_aggregate份梯度才会开始更新（注意这里TensorFlow不要求每一份梯度来自不同的计算服务器）。**同步模式不仅仅是一次使用多份梯度，`tf.train.SyncReplicasOptimizer`的实现同时也保证了不会出现陈旧变量的问题,**它会记录每一份梯度是不是由最新的变量值计算得到的，如果不是，那么这一份梯度将会被丢弃。"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
